package com.bjy.qa.util.mqtt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bjy.qa.dao.functionaltest.AgentDao;
import com.bjy.qa.dao.functionaltest.ApiLogDao;
import com.bjy.qa.entity.functionaltest.ApiLog;
import com.bjy.qa.enumtype.AgentStatus;
import com.bjy.qa.exception.MyException;
import org.apache.commons.lang.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Component
public class MqttLaunchTool implements ApplicationRunner {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    static private Map<String, MqttHelper> agentMqttHelperMap = new HashMap<>(); // 保存 agent 对应的 mqtt 客户端

    @Resource
    AgentDao agentDao;

    @Resource
    ApiLogDao apiLogDao;

    private static String mqttUrl;
    @Value("${qa-platform.mqtt.url}")
    public void setMqttUrl(String mqttUrl) {
        this.mqttUrl = mqttUrl;
    }

    private static String mqttQos;
    @Value("${qa-platform.mqtt.qos}")
    public void setMqttQos(String mqttQos) {
        this.mqttQos = mqttQos;
    }

    private static Boolean mqttAllowAnonymous;
    @Value("${qa-platform.mqtt.allow-anonymous:false}")
    public void setMqttAllowAnonymous(Boolean mqttAllowAnonymous) {
        this.mqttAllowAnonymous = mqttAllowAnonymous;
    }

    private static String mqttUserName;
    @Value("${qa-platform.mqtt.username}")
    public void setMqttUserName(String mqttUserName) {
        this.mqttUserName = mqttUserName;
    }

    private static String mqttPassword;
    @Value("${qa-platform.mqtt.password}")
    public void setMqttPassword(String mqttPassword) {
        this.mqttPassword = mqttPassword;
    }

    @Value("${qa-platform.mqtt.api-key}")
    private String mqttApiKey;

    @Value("${qa-platform.mqtt.secret-key}")
    private String mqttSecretKey;

    @Value("${qa-platform.mqtt.api-get-clients-url}")
    private String mqttApiGetClientsUrl;

    @Value("${qa-platform.mqtt.base-topic}")
    private String mqttBaseTopic;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        monitoringDeviceState(); // 监听设备上下线
        monitoringUpData(); // 监听上传数据
        syncAgentStatus(); // 同步 agent 状态
    }

    /**
     * 监听设备山下线
     * @throws MqttException
     */
    private void monitoringDeviceState() throws MqttException {
        String downTopic = "$SYS/brokers/+/clients/#";
        logger.info("连接 MQTT 通道: {url={}, qos: {}, subTopic={}, allowAnonymous={}, userName={}, passWord=******}", mqttUrl, mqttQos, downTopic, mqttAllowAnonymous, mqttUserName);

        // 创建 mqtt 客户端
        MqttHelper downMqttHelper = new MqttHelper();
        downMqttHelper.setQos(Enum.valueOf(MqttQOS.class, mqttQos));
        if (mqttAllowAnonymous) {
            downMqttHelper.getInstance(mqttUrl, "SUB:sys_subject" + "_" + System.currentTimeMillis());
        } else {
            downMqttHelper.getInstance(mqttUrl, "SUB:sys_subject" + "_" + System.currentTimeMillis(), mqttUserName, mqttPassword);
        }

        // 订阅主题
        downMqttHelper.subscribe(new MqttHelper.MqttListener() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
            }

            @Override
            public void connectionLost(Throwable cause) {
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                logger.info("--> topic: {}", topic);
                logger.info("--> message: {}", message);

                if (topic.indexOf("/connected") > 0) {
                    JSONObject jsonObject = JSON.parseObject(message.toString());
                    String agentKey = getAgentKey(jsonObject.getString("clientid"));
                    if (StringUtils.isNotBlank(agentKey)) {
                        logger.info("agentKey={} connected", agentKey);
                        agentDao.updateStatusByAgentKey(agentKey, AgentStatus.ON_LINE.getValue());
                    }
                } else if (topic.indexOf("/disconnected") > 0) {
                    JSONObject jsonObject = JSON.parseObject(message.toString());
                    String agentKey = getAgentKey(jsonObject.getString("clientid"));
                    if (StringUtils.isNotBlank(agentKey)) {
                        logger.info("agentKey={} disconnected", agentKey);
                        agentDao.updateStatusByAgentKey(agentKey, AgentStatus.OFF_LINE.getValue());
                    }
                } else {
                    logger.info("未识别的 topic。topic：{}, message: {}", topic, message);
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
            }
        }, downTopic);
    }

    /**
     * 监听上传数据
     * @throws MqttException
     */
    private void monitoringUpData() throws MqttException {
        String upTopic = mqttBaseTopic + "/UP";
        logger.info("连接 MQTT 通道: {url={}, qos: {}, subTopic={}, allowAnonymous={}, userName={}, passWord=******}", mqttUrl, mqttQos, upTopic, mqttAllowAnonymous, mqttUserName);

        // 创建 mqtt 客户端
        MqttHelper downMqttHelper = new MqttHelper();
        downMqttHelper.setQos(Enum.valueOf(MqttQOS.class, mqttQos));
        if (mqttAllowAnonymous) {
            downMqttHelper.getInstance(mqttUrl, "SUB:server" + "_" + System.currentTimeMillis());
        } else {
            downMqttHelper.getInstance(mqttUrl, "SUB:server" + "_" + System.currentTimeMillis(), mqttUserName, mqttPassword);
        }

        // 订阅主题
        downMqttHelper.subscribe(new MqttHelper.MqttListener() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
            }

            @Override
            public void connectionLost(Throwable cause) {
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                logger.info("--> topic: {}", topic);
                logger.info("--> message: {}", message);

                boolean ret = true;
                try {
                    ApiLog apiLog = JSON.parseObject(message.toString(), ApiLog.class);
                    if (apiLog.getMsg().equals("step")) { // 保存上报的 step 数据
                        apiLogDao.insert(apiLog);
                    } else if (apiLog.getMsg().equals("status")) { // 上报的 status 数据，丢弃
                        // apiLogDao.insert(apiLog); // 保存新数据
                    }
                } catch (Exception e) {
                    ret = false;
                    logger.error("收到 agent 上报数据，保存失败！{}", e.getMessage());
                }
                if (ret) {
                    logger.info("收到 agent 上报数据，处理成功");
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
            }
        }, upTopic);
    }

    /**
     * 同步 agent 状态
     */
    private void syncAgentStatus() {
        agentDao.updateStatus(AgentStatus.OFF_LINE.getValue()); // 先将所有的设备状态置为离线

        RestTemplate restTemplate = new RestTemplateBuilder().basicAuthentication(mqttApiKey, mqttSecretKey).build();
        ResponseEntity<String> forEntity = restTemplate.getForEntity(mqttApiGetClientsUrl, String.class);
        JSONObject jsonObject = JSON.parseObject(forEntity.getBody());

        JSONArray clientList = jsonObject.getJSONArray("data");
        for(Object client : clientList) {
            if (client instanceof JSONObject) {
                Boolean connected = ((JSONObject) client).getBoolean("connected");
                String agentKey = getAgentKey(((JSONObject) client).getString("clientid"));
                if (StringUtils.isNotBlank(agentKey) && connected != null && connected) {
                    agentDao.updateStatusByAgentKey(agentKey, AgentStatus.ON_LINE.getValue());
                }
            }
        }
    }

    /**
     * 从 clientid 中 获取 agent key
     * @param clientid clientid
     * @return
     */
    private String getAgentKey(String clientid) {
        String key = "";
        Pattern pattern = Pattern.compile("SUB:(.+)_\\w+");
        Matcher matcher = pattern.matcher(clientid);
        if(matcher.find()) {
            key = matcher.group(1);
        }
        return key;
    }

    /**
     * 通过 mqtt 向 agent 下发命令
     * @param agentKey agent key
     * @param topic 主题
     * @param msg json 格式的命令
     */
    public static void pubCmd(String agentKey, String topic, String msg) {
        try {
            MqttHelper downMqttHelper = agentMqttHelperMap.get(agentKey);
            if (downMqttHelper == null || !downMqttHelper.isConnected()) {
                downMqttHelper = new MqttHelper();
                downMqttHelper.setQos(Enum.valueOf(MqttQOS.class, mqttQos));
                if (mqttAllowAnonymous) {
                    downMqttHelper.getInstance(mqttUrl, "PUB:" + agentKey + "_" + System.currentTimeMillis());
                } else {
                    downMqttHelper.getInstance(mqttUrl, "PUB:" + agentKey + "_" + System.currentTimeMillis(), mqttUserName, mqttPassword);
                }
                agentMqttHelperMap.put(agentKey, downMqttHelper);
            }

            downMqttHelper.publish(topic, msg);
        } catch (MqttException e) {
            e.printStackTrace();
            throw new MyException(e.getMessage());
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new MyException(e.getMessage());
        }
    }
}
